/*
 * SPDX-License-Identifier: MIT
 * Copyright (c) Huawei Technologies Co., Ltd. 2021-2025. All rights reserved.
 * File Name     : dlock_server.h
 * Description   : dlock server
 * History       : create file & add functions
 * 1.Date        : 2021-06-15
 * Author        : zhangjun
 * Modification  : Created file
 */

#ifndef __DLOCK_SERVER_H__
#define __DLOCK_SERVER_H__

#include <arpa/inet.h>
#include <pthread.h>
#include <vector>
#include <queue>
#include <shared_mutex>
#include <set>
#include <chrono>

#include "dlock_types.h"
#include "dlock_common.h"
#include "client_entry_s.h"
#include "lock_entry_s.h"
#include "object_entry_s.h"
#include "server_node.h"
#include "dlock_descriptor.h"
#include "object_memory.h"
#include "dlock_log.h"

namespace dlock {
#define MEASURE_ENABLE 0

class dlock_server_mgr;

using lock_map_t = std::unordered_map<int32_t, lock_entry_s*>;
using object_map_t = std::unordered_map<int32_t, object_entry_s*>;

using client_map_t = std::unordered_map<int32_t, client_entry_s*>;
using lock_desc_map_t = std::unordered_map<dlock_descriptor*, lock_entry_s*, hash_dlock_desc, equal_dlock_desc>;
using object_desc_map_t = std::unordered_map<dlock_descriptor*, object_entry_s*, hash_dlock_desc, equal_dlock_desc>;
using connection_map_t = std::unordered_map<int, dlock_connection*>;
using jetty_mgr_map_t = std::unordered_map<uint32_t, jetty_mgr*>;
using exception_client_set_t = std::set<int32_t>;

constexpr size_t MAX_OBJECT_SIZE = 102400;  /* maximum alive object number */

struct jetty_mgr_invalid_info {
    jetty_mgr *p_jetty_mgr;
    std::chrono::steady_clock::time_point invalid_time_point;
};

class dlock_server {
    friend class dlock_server_mgr;
public:
    dlock_server() = delete;
    explicit dlock_server(int server_id) noexcept;
    ~dlock_server();
    int init(const struct server_cfg &cfg);
    void deinit();
    int init_as_primary(const struct server_cfg &cfg);
    int launch();
    void quit();
    int primary_control_func(int control_epfd, int ev_fd);
    int primary_control_loop();
    int primary_preinit_func(const urma_cr_t &cr) const;
    int primary_cmd_handler();
    int init_client_do(dlock_connection *p_conn, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    int reinit_client_do(dlock_connection *p_conn, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    int deinit_client_do(dlock_connection *p_conn, struct dlock_control_hdr* msg_hdr, uint8_t* /* msg_body */);
    int client_heartbeat_do(dlock_connection *p_conn, struct dlock_control_hdr* msg_hdr, uint8_t * /* msg_body */);
    int get_lock_do(dlock_connection * /* p_conn */, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    int release_lock_do(dlock_connection *p_conn, struct dlock_control_hdr *msg_hdr, uint8_t *msg_body);
    int batch_get_lock_do(dlock_connection * /* p_conn */, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    void release_lock_map_entry(const lock_map_t::iterator lock_iter,
        const struct release_lock_body release_msg, int32_t client_id);
    int batch_release_lock_do(dlock_connection *p_conn, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    int reinit_client_done(dlock_connection *p_conn, struct dlock_control_hdr* msg_hdr, uint8_t* /* msg_body */);
    int batch_update_locks_do(dlock_connection *p_conn, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    int primary_set_affinity(char *cpuset, enum thread_type type);
    int create_object_do(dlock_connection * /* p_conn */, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    int get_object_do(dlock_connection * /* p_conn */, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    int release_object_do(dlock_connection * /* p_conn */, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    int destroy_object_do(dlock_connection * /* p_conn */, struct dlock_control_hdr* msg_hdr, uint8_t *msg_body);
    struct debug_stats m_stats;

    inline void erase_from_m_jetty_mgr_map(uint32_t local_id)
    {
        std::unique_lock<std::shared_mutex> locker(m_jetty_mgr_map_rwlock);
        m_jetty_mgr_map.erase(local_id);
    }

    inline void add_to_m_jetty_mgr_map(uint32_t local_id, jetty_mgr *p_jetty_mgr)
    {
        std::unique_lock<std::shared_mutex> locker(m_jetty_mgr_map_rwlock);
        jetty_mgr_map_t::iterator jetty_mgr_iter = m_jetty_mgr_map.find(local_id);
        if (jetty_mgr_iter != m_jetty_mgr_map.end()) {
            /*
             * The local_id is jetty/jfs id, generated by the UB device and is expected to be unique.
             * If it is duplicated, dlock cannot handle this exception. Exception handling, as well as
             * the processes of modifying and deleting Jetty/JFS, also require the use of jetty/jfs id.
             * Therefore, this is just a precautionary measure to report an error when something goes wrong,
             * but no exception handling will be performed.
             */
            DLOCK_LOG_ERR("Duplicate local_id: %u !", local_id);
        }

        m_jetty_mgr_map[local_id] = p_jetty_mgr;
    }

private:
    void unregister_lock_mem_dma_tseg(void);
    void unregister_obj_mem_dma_tseg(void);
    void delete_exe_jfc(void);
    void clear_m_client_map(void);
    void clear_m_jetty_mgr_map(void);
    void clear_m_lock_map(void);
    void clear_m_object_map(void);
    void clear_m_fd2conn_map(void);
    void deinit_server_proc(void);
    int check_recv_cr_status(urma_cr_t *cr, int idx, bool ssl_enable);
    int init_server(bool is_primary, const struct server_cfg &cfg);
    int primary_get_addr_and_ports(const struct server_cfg &cfg,
        struct in_addr &ip_addr, uint16_t &server_port) const;
    int create_listen_fd(const struct in_addr &ip_addr, uint16_t port, int &listen_fd);
    int recv_msg_hdr(dlock_connection *p_conn, struct dlock_control_hdr *msg_hdr);
    int recv_msg_ext_hdr_and_body(dlock_connection *p_conn,
        uint8_t ext_hdr_len, uint16_t body_len, uint8_t **msg_ext_hdr, uint8_t **msg_body);
    void free_msg_ext_hdr_and_body_recv_buf(uint8_t *msg_ext_hdr, uint8_t *msg_body) const;
    int process_control_msg(dlock_connection *p_conn, uint8_t min_type, uint8_t max_type);
    void process_control_msg_err(struct dlock_control_hdr &msg_hdr,
        dlock_connection *p_conn, int32_t ret_status);
    int do_lock(struct urma_buf *p_rx_buf, uint32_t msg_len);
    int update_client(int32_t client_id, dlock_connection *p_conn, jetty_mgr *p_jetty_mgr, bool reinit_flag);
    jetty_mgr *init_client_primary(struct urma_init_body *jetty_info, bool /* reinit_flag */);
    int init_client_response(dlock_connection *p_conn, int32_t client_id, jetty_mgr *p_jetty_mgr, bool reinit_flag);
    int get_client_id();
    int find_available_lock_id(int lock_id);
    int find_available_object_id(int object_id);
    int get_lock_reply(dlock_connection *p_conn, struct get_lock_body *msg_body) const;
    void lock_entry_release(lock_entry_s *lock_entry, const struct release_lock_body *release_msg);
    lock_entry_s* get_lock_by_msg(struct get_lock_body *get_msg);
    int batch_get_lock_reply(dlock_connection *p_conn, uint32_t lock_num, uint8_t *msg_body) const;
    void preprocess_lock_cmd_msg(struct lock_cmd_msg *msg, uint32_t cmd_num);
    void process_urma_cr_local_jfs(const urma_cr_t &cr) const;
    lock_entry_s *update_lock_by_msg(struct update_lock_body *update_msg);
    void get_process_control_msg_range(uint8_t &min_type, uint8_t &max_type) const;
    void server_mode_handler();
    void get_primary_affinity(enum thread_type type) const;
    int set_thread_affinity() const;
    int client_num_count_down();
    void init_primary_server_state(void);
    void modify_response_with_fairlock_ticket(struct lock_cmd_msg *msg, uint32_t cmd_num,
        uint32_t ticket_obtain_time) const;
    void modify_response_with_fairlock_ticket(struct lock_cmd_msg *msg, uint32_t cmd_num) const;
    void conn_exception_process(dlock_connection *p_conn);
    int modify_jetty_mgr_to_busy(jetty_mgr *p_jetty_mgr) const;
    int modify_jetty_mgr_to_active(jetty_mgr *p_jetty_mgr) const;
    int modify_jetty_mgr_to_invalid(jetty_mgr *p_jetty_mgr);
    void delete_invalid_jetty_mgr(void);
    void clear_m_jetty_mgr_invalid_queue(void);
    void mark_client_exception(int32_t client_id, dlock_connection *p_conn);
    void clear_lock_client_relation(int client_id, client_entry_s *client_entry);
    void clear_object_client_relation(int client_id, client_entry_s *client_entry);
    int delete_except_client_entry();
    int check_control_msg_client_id(const struct dlock_control_hdr msg_hdr, dlock_connection *p_conn) const;
    void fake_client_msg_process(dlock_connection *p_conn);
    int check_msg_type_range(const struct dlock_control_hdr msg_hdr, const uint8_t min_type, const uint8_t max_type,
        int32_t *ret_status) const;
    int check_control_msg_hdr(const struct dlock_control_hdr &msg_hdr) const;
    dlock_status_t read_entropy_pool(int fd, uint8_t *seed, int seed_len) const;
    dlock_status_t set_random_seed() const;
    void delete_sockfd(int sockfd) const;
    void delete_dlock_connection(dlock_connection *p_conn);
    int negotiate_proto_version(const struct client_init_req_body &req_body) const;
    int check_cmd_msg_common_field(const struct lock_cmd_msg &msg) const;

    object_entry_s* create_object_by_msg(struct object_create_body *body, int32_t client_id);
    void destroy_object_entry(object_entry_s *entry);
    void refresh_object_entry(object_entry_s *entry, struct dlock_control_hdr *msg_hdr,
        std::chrono::seconds lease_time);
    template <typename T>
    int object_reply(dlock_connection *p_conn, T *body);
#if defined(MEASURE_ENABLE) && (MEASURE_ENABLE != 0)
    void measure_throughput(void);
#endif

    pthread_t m_control_tid;
    pthread_t m_cmd_tid;
    bool m_is_primary;
    client_map_t m_client_map;
    lock_map_t m_lock_map;
    lock_desc_map_t m_lock_desc_map;
    exception_client_set_t m_except_client_set;
    urma_ctx *m_p_urma_ctx;
    urma_jfc_t *m_exe_jfc;
    server_node *m_primary;
    int m_next_client_id;
    int m_curr_lock_id;
    size_t m_recv_buf_size;
    uint8_t *m_recv_buff;
    int m_server_id;
    bool m_stop;
    lock_memory *m_lock_memory;
    urma_target_seg_t *m_lock_mem_dma_tseg; /* Exported target segment for lock memory read/write operation */
    urma_token_t m_lock_mem_tseg_token;
    unsigned int m_recovery_client_num;
    int m_listen_fd;
    connection_map_t m_fd2conn_map;
    bool m_ssl_enable;
    ssl_init_attr_t m_ssl_init_attr;
    bool m_is_cpu_ctrl_affnty_set;
    bool m_is_cpu_cmd_affnty_set;
    cpu_set_t m_ctrl_cpuset;
    cpu_set_t m_cmd_cpuset;
    std::queue<struct jetty_mgr_invalid_info> m_jetty_mgr_invalid_queue;

    struct timeval m_time_previous;
    struct timeval m_time_current;
    int m_num_reqs;
    bool m_sleep_cfg_enable;
    bool m_sleep_mode;
    int m_client_num;
    trans_mode_t m_tp_mode;
    dlock_server_state_t m_server_state;
    int m_control_epfd;
    int m_lock_num;
#if defined(MEASURE_ENABLE) && (MEASURE_ENABLE != 0)
    struct timeval m_tv_start;
    uint32_t m_measure_count;
#endif

    object_desc_map_t m_object_desc_map;
    object_map_t m_object_map;
    object_memory *m_object_memory;
    urma_target_seg_t *m_obj_mem_dma_tseg; /* Exported target segment for object memory faa/cas and read operation. */
    urma_token_t m_obj_mem_tseg_token;
    int m_curr_object_id;
    int m_curr_object_num;

    jetty_mgr_map_t m_jetty_mgr_map;
    std::shared_mutex m_jetty_mgr_map_rwlock;
};
};
#endif
